[1/3] sdks/python: refactor Milvus-related utilities as preparation step for Milvus Sink I/O integration#35708
Conversation
|
Important Installation incomplete: to start using Gemini Code Assist, please ask the organization owner(s) to visit the Gemini Code Assist Admin Console and sign the Terms of Services. |
|
stop reviewer notifications |
|
Stopping reviewer notifications for this pull request: requested by reviewer. If you'd like to restart, comment |
|
Regards dynamic batching perhaps it is in nice to have category. For now we can safely use the default JDBC I/O connector default batch size (1000) |
b084802 to
288f245
Compare
288f245 to
b957194
Compare
|
Will make this PR in reviewable state after this PR #35467 merged. It refactors and unify some code related to testing from both Milvus Enrichment Handler and Sink I/O |
c07bd55 to
5df9628
Compare
4512b63 to
91266a7
Compare
390e69f to
9445aaa
Compare
|
The test 📋 Failure Logs[gw5] [ 30%] FAILED apache_beam/ml/inference/pytorch_inference_test.py::PytorchRunInferencePipelineTest::test_gpu_auto_convert_to_cpu
_________ PytorchRunInferencePipelineTest.test_gpu_auto_convert_to_cpu _________
[gw5] linux -- Python 3.9.22 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-ml/py39-ml/bin/python
self = <apache_beam.ml.inference.pytorch_inference_test.PytorchRunInferencePipelineTest testMethod=test_gpu_auto_convert_to_cpu>
def test_gpu_auto_convert_to_cpu(self):
"""
This tests the scenario in which the user defines `device='GPU'` for the
PytorchModelHandlerX, but runs the pipeline on a machine without GPU, we
automatically detect this discrepancy and do automatic conversion to CPU.
A warning is also logged to inform the user.
"""
with self.assertLogs() as log:
with TestPipeline() as pipeline:
examples = torch.from_numpy(
np.array([1, 5, 3, 10], dtype="float32").reshape(-1, 1))
state_dict = OrderedDict([('linear.weight', torch.Tensor([[2.0]])),
('linear.bias', torch.Tensor([0.5]))])
path = os.path.join(self.tmpdir, 'my_state_dict_path')
torch.save(state_dict, path)
model_handler = PytorchModelHandlerTensor(
state_dict_path=path,
model_class=PytorchLinearRegression,
model_params={
'input_dim': 1, 'output_dim': 1
},
device='GPU')
# Upon initialization, device is cuda
self.assertEqual(model_handler._device, torch.device('cuda'))
pcoll = pipeline | 'start' >> beam.Create(examples)
# pylint: disable=expression-not-assigned
pcoll | RunInference(model_handler)
# During model loading, device converted to cuda
> self.assertEqual(model_handler._device, torch.device('cuda'))
apache_beam/ml/inference/pytorch_inference_test.py:770:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/apache_beam/pipeline.py:670: in __exit__
self.result = self.run()
target/.tox-py39-ml/py39-ml/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:122: in run
state = result.wait_until_finish(duration=self.timeout)
...The test 📋 Failure Logs[gw3] [ 99%] FAILED apache_beam/yaml/main_test.py::MainTest::test_external_test_specs
______________________ MainTest.test_external_test_specs _______________________
[gw3] linux -- Python 3.9.22 /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/bin/python
self = <apache_beam.yaml.main_test.MainTest testMethod=test_external_test_specs>
def test_external_test_specs(self):
with tempfile.TemporaryDirectory() as tmpdir:
good_suite = os.path.join(tmpdir, 'good.yaml')
with open(good_suite, 'w') as fout:
fout.write(PASSING_TEST_SUITE)
bad_suite = os.path.join(tmpdir, 'bad.yaml')
with open(bad_suite, 'w') as fout:
fout.write(FAILING_TEST_SUITE)
# Must pass.
> main.run_tests([
'--yaml_pipeline',
TEST_PIPELINE,
'--test_suite',
good_suite,
],
exit=False)
apache_beam/yaml/main_test.py:167:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
argv = ['--yaml_pipeline', '\npipeline:\n type: chain\n transforms:\n - type: Create\n config:\n elements: [...ork/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpuubw5yj6/good.yaml']
exit = False
def run_tests(argv=None, exit=True):
known_args, pipeline_args, _, pipeline_yaml = _build_pipeline_yaml_from_argv(
argv)
pipeline_spec = yaml.load(pipeline_yaml, Loader=yaml_transform.SafeLineLoader)
options = _build_pipeline_options(pipeline_spec, pipeline_args)
if known_args.create_test and known_args.fix_tests:
raise ValueError(
'At most one of --create_test and --fix_tests may be specified.')
elif known_args.create_test:
result = unittest.TestResult()
tests = []
else:
if known_args.test_suite:
with open(known_args.test_suite) as fin:
test_suite_holder = yaml.load(
fin, Loader=yaml_transform.SafeLineLoader) or {}
else:
test_suite_holder = pipeline_spec
test_specs = test_suite_holder.get('tests', [])
if not isinstance(test_specs, list):
raise TypeError('tests attribute must be a list of test specifications.')
elif not test_specs:
raise RuntimeError(
'No tests found. '
"If you haven't added a set of tests yet, you can get started by "
'running your pipeline with the --create_test flag enabled.')
tests = [
yaml_testing.YamlTestCase(
pipeline_spec, test_spec, options, known_args.fix_tests)
for test_spec in test_specs
]
suite = unittest.TestSuite(tests)
result = unittest.TextTestRunner().run(suite)
if known_args.fix_tests or known_args.create_test:
update_tests(known_args, pipeline_yaml, pipeline_spec, options, tests)
if exit:
# emulates unittest.main()
sys.exit(0 if result.wasSuccessful() else 1)
else:
if not result.wasSuccessful():
> raise RuntimeError(result)
E RuntimeError: <unittest.runner.TextTestResult run=1 errors=1 failures=0>
apache_beam/yaml/main.py:196: RuntimeError
----------------------------- Captured stderr call -----------------------------
E
======================================================================
ERROR: ExternalTest (line 3)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/yaml/yaml_testing.py", line 51, in runTest
self._fixes = run_test(
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/apache_beam/yaml/yaml_testing.py", line 102, in run_test
_ = p | yaml_transform.YamlTransform(
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py", line 672, in __exit__
self.result.wait_until_finish()
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 571, in wait_until_finish
raise self._runtime_exception
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 580, in _observe_state
for state_response in self._state_stream:
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 543, in __next__
return self._next()
File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "UNKNOWN:Error received from peer {created_time:"2025-11-02T17:37:42.69913264+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"The Publish Test Results CI step in the CI workflow run |
|
Hey @damccorm, I think this PR is now ready for review. I've added the Milvus Sink I/O integration along with unit and integration tests, and did some refactoring by unifying core/testing utilities between the Milvus enrichment handler and sink I/O. All relevant CI tests are passing, and I'd love to receive any feedback when you have time for it |
damccorm
left a comment
There was a problem hiding this comment.
Thanks! Pausing my review for now since my last comment would cause a major set of changes
In this commit, we remove that builder method to remain functional and be used in the next Milvus sink integration PR
damccorm
left a comment
There was a problem hiding this comment.
Thanks! It looks like there are now merge conflicts, but other than that and my one comment this looks good
0ee038c to
0c00044
Compare
|
@damccorm – addressed the recent comments and moved the jupyter notebook update to be after the Beam version is released (as in #35708 (comment)). The relevant CI tests are passing in the CI and the CI failed tests are unrelated to the changeset this PR introduce. I think the PR is now ready for another look |
Change Description
Towards #35046.
Depends on #35920.
Next #36729.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.